package media

import (
	"errors"
	"gitee.com/yangzx6606/core/pattern"
	"gitee.com/yangzx6606/core/pb"
	"github.com/deepch/vdk/av"
	"github.com/deepch/vdk/codec/h264parser"
	"github.com/deepch/vdk/format/rtspv2"
	"google.golang.org/protobuf/types/known/anypb"
	"time"
)

type RtspPuller struct {
	pattern.AbstractSubject[*VideoPacket]
	url         string
	client      *rtspv2.RTSPClient
	isOpened    bool
	isConnected bool
	codec       []av.CodecData
	videoCodec  av.VideoCodecData
}

func NewRtspPuller(url string) *RtspPuller {
	return &RtspPuller{url: url}
}

func (r *RtspPuller) Open() error {
	if r.isOpened {
		return errors.New("已打开")
	}
	r.isOpened = true
	go r.doWork()
	return nil
}

func (r *RtspPuller) Close() {
	if r.isOpened {
		r.isOpened = false
		r.client.Close()
	}
}

func (r *RtspPuller) doWork() {
	keyTest := time.NewTimer(20 * time.Second)
	for r.isOpened {
		if r.tryConnect() != nil {
			time.Sleep(time.Second * 3)
			continue
		}
		select {
		case <-keyTest.C:
			r.client.Close()
			r.isConnected = false
			r.Notify(&VideoPacket{Type: VideoPacketTypeConnectLost})

		case signals := <-r.client.Signals:
			switch signals {
			case rtspv2.SignalCodecUpdate:
				r.codec = r.client.CodecData
				for _, v := range r.codec {
					if v.Type().IsVideo() {
						r.videoCodec = v.(av.VideoCodecData)
					}
				}
			case rtspv2.SignalStreamRTPStop:
				r.client.Close()
				r.isConnected = false
				r.Notify(&VideoPacket{Type: VideoPacketTypeConnectLost})
			}
		case packet := <-r.client.OutgoingPacketQueue:
			frame := &pb.MediaFrame{
				IsKeyFrame:     packet.IsKeyFrame,
				Content:        packet.Data,
				FrameTimestamp: int64(packet.Time),
				Timestamp:      time.Now().UnixMilli(),
			}
			data, _ := anypb.New(frame)
			if packet.IsKeyFrame {
				keyTest.Reset(time.Second * 20)
				r.notifyCodec()
				r.Notify(&VideoPacket{Type: VideoPacketTypeKeyFrame, Packet: &pb.Packet{Type: pb.PacketType_PacketType_MediaFrame, Data: data}})
			} else {
				r.Notify(&VideoPacket{Type: VideoPacketTypeFrame, Packet: &pb.Packet{Type: pb.PacketType_PacketType_MediaFrame, Data: data}})
			}
		}
	}
}

func (r *RtspPuller) tryConnect() error {
	if r.isConnected {
		return nil
	}
	var err error
	r.client, err = rtspv2.Dial(rtspv2.RTSPClientOptions{
		URL:              r.url,
		DisableAudio:     true,
		DialTimeout:      3 * time.Second,
		ReadWriteTimeout: 3 * time.Second,
		Debug:            true,
	})
	if err != nil {
		return err
	}
	r.codec = r.client.CodecData
	for _, v := range r.codec {
		if v.Type().IsVideo() {
			r.videoCodec = v.(av.VideoCodecData)
		}
	}
	r.isConnected = true
	r.Notify(&VideoPacket{Type: VideoPacketTypeConnected})
	return err
}

func (r *RtspPuller) notifyCodec() {
	codec := r.videoCodec.(h264parser.CodecData)
	codecFrame := &pb.Codec{
		PPS: codec.PPS(),
		SPS: codec.SPS(),
		FPS: int32(codec.FPS()),
	}
	data, _ := anypb.New(codecFrame)
	r.Notify(&VideoPacket{Type: VideoPacketTypeCodec, Packet: &pb.Packet{Type: pb.PacketType_PacketType_CodecFrame, Data: data}})
}
